云布道师
编者按:马阳阳-基础架构部实时平台 Flink 引擎资深开发工程师丁国涛-基础架构部实时平台 Flink 引擎资深开发工程师背景
以 Flink 为基础的实时计算在 B站有着广泛而深入的应用。目前 B站的 Flink 作业主要运行在三种集群环境下:纯物理机部署的 YARN 集群、为了提高 Kafka 集群资源利用率而和 Kafka 混部的 YARN 集群以及为了提高线上服务器而和 K8S 混部的 YARN 集群(这部分有计划在不远的将来使用Flink On K8S部署方式代替)。其中纯物理机 YARN 集群使用纯 SSD 盘的统一机型的服务器,包含 1000+ 台服务器;和 Kafka 混部的集群目前为 Flink 提供了 2000+ cores;和线上的 K8S 混部的集群已经使用了 6000+ cores,并且还在持续增加。在业务方向上,B站的 Flink 已经应用在了包括 AI、广告、数仓、数据传输和其它的很多业务上。目前 B站 Flink 作业的最大并行度为 2000。下图展示了 B站实时应用的整体架构及 Flink Runtime 的工作范围。正是由于在 B站 Flink 应用广泛,作业数量众多,很多作业的流量和并行度也很大,我们在使用 Flink 的过程中遇到了一些社区版本的 Flink 无法满足的功能。我们遇到的主要痛点如下:为了解决上述痛点,我们对 Flink Runtime 进行了很多的定制开发和改进。下面将从以下几个方面介绍一下 B站在 Flink Runtime 上所做的改进:Checkpoint 相关的改进
Checkpoint 作为 Flink 容错机制的基础,对 Flink 作业有着重要的意义。在解决实际生产问题和与用户交流的过程中,Checkpoint 相关的问题也占着极大的比重。为了更好地满足平台和用户的需求,我们在以下几个方面对 Checkpoint 做了大量的改进:可恢复性、Checkpoint 优化以及相关工具的开发。 可恢复性
1、改进的 Operator ID 生成算法在 Flink 中作业中经常出现的场景是随着流量或者计算复杂度变化,用户或者平台需要改变作业的并行度以增加处理能力。这种变化可能导致 Kafka Source 算子和下游算子的连接关系发生变化(例如 Kafka Topic 的 partition 数量为 50,并行度从 50 变为 100,这种情况下 Kafka Source 和下游的连接关系从 forward 变为 rebalance(社区原生)或者 rescale(B站改进))。社区原生的 Operator ID 生成算法中,计算一个算子的 Operator ID 时,会将其下游可以 chain 在一起的算子也考虑进去。在刚刚我们提到的场景中,由于 Kafka Source 可能从可以和下游算子 chain 在一起变为不能 chain,从而导致计算出的算子 ID 发生变化,进而导致作业无法从原来的 Checkpoint 恢复。为了解决上述问题,我们扩展了社区的算子 Operator ID 生成算法 StreamGraphHasherV2,引入了 StreamGraphHasherV3。在 StreamGraphHasherV3 中,在计算 Operator ID 时,不考虑算子和下游算子的连接关系,可以生成稳定的算子 Operator ID,很大地提升了因为作业并行度变化情况下的 Checkpoint 可恢复率。2、调整最大并行度的计算方法在 Flink 的状态中,最大并行度是一个重要的概念。为了从状态恢复时,不必拉取所有的状态文件,Flink 使用了类似一致性哈希的做法,将状态的键值做哈希后划分到一个固定的份数里,每个算子的一个并行度负责其中的一个范围。而这个需要将键值的哈希划分的份数就是由最大并行度决定的。Flink 中采用如下算法决定一个算子的最大并行度://如果用户手动设置了最大并行度,则使用用户设置的最大并行度
//否则按照如下算子计算最大并行度
Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
128),
32768);
在我们的生产场景中,经常遇到因为最大并行度变化导致无法从 Checkpoint 恢复的问题。为了解决该问题,我们的工作从两个方面进行。第一个方面调整最大并行度的计算方法,在本部分介绍;第二个方面对存量的状态,从状态中读取出原始的键值,根据新的最大并行度重新计算键值的 key group,并写入新的状态中,作业再从新的状态恢复,这部分工作将在下面介绍。
在新的最大并行度计算算法中,我们将最大并行度的最小值调整为 1024,并依据并行度的 10 倍按照原来的算法计算最大并行度。
Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo((operatorParallelism + (operatorParallelism / 2)) * 10),
1024),
32768);
3、基于 State Processor API 的 Key Group 重算方法
上面提到的方法可以解决新增作业在大多数情况下从 Checkpoint 恢复的需求,但是无法对存量作业起作用。对于存量作业,我们的思路是从 State 文件中反序列化出原始的键值,并根据反序列化出的键值重新计算所属的 key group,然后将结果写入新的状态文件,生成新的 Checkpoint,然后作业可以从新的 Checkpoint 恢复。基于上述思路,我们调研了社区的相关的工具,发现 State Processor API 可以部分地满足我们的需求。关于 State Processor API 的介绍及使用方式,可以参考社区相关文档,下面主要介绍一下我们为了实现自己的需求对 State Processor API 所做的扩展工作。分析之后,我们发现原生的 State Processor API 在如下几个方面不能满足的我们的需求:- State Processor API 提供的用户接口是基于单个算子的,而我们需要的是对 Checkpoint 的所有的状态进行读取并计算。
- State Processor API 中需要用户手动构造 StateDescriptor 并传入相关方法中。
- State Processor API 会同时反序列化 Key 和点查对应的 Value,性能上无法满足需求。
针对上述问题,我们的解决方案如下:
- 从 Checkpoint 的_metadata 文件中读取所有的 Operator ID,对每个 Operator ID 依次调用 State Processor API 提供的接口进行单个算子的计算。
- 在 Checkpoint 的元数据中加入必要的信息,在使用 State Processor API 前从 Checkpoint 的元数据中读取必要的信息生成调用 State Processor API 用户接口需要的 StateDescriptor。
- 修改 State Processor API 的实现方式,添加一种只会反序列化 key,而无需反序列化 value(将 value 作为字节数组看待)的方式。
基于上述修改,我们实现了数据的 key group 的重算,系统的整体架构如下图所示。经过重算的状态,算子的最大并行度可以满足恢复的需求。这样我们就解决了由于最大并行度变化导致的 Checkpoint 无法恢复的问题。
4、使用 Operator Name 辅助从 Checkpoint 恢复
由于上面提到的 Operator ID 生成算法会导致 Checkpoint 不兼容,无法全量应用在所有的作业中(新增作业可以应用,存量作业无法应用)。我们需要找到新的办法来解决算子间连接关系变化导致的 Checkpoint 不兼容问题。基于 B站 Flink 作业的特点,SQL 作业占了多数,且 SQL 作业中大多数情况下都不存在同名的 Operator Name,我们设计了一种新的方法来辅助从 Checkpoint 恢复,即在 Operator ID 冲突导致无法从 Checkpoint 恢复时,试着将 Operator Name 当做桥梁,将 Checkpoint 中的 Operator ID 和作业 DAG 中的 Operator ID 关联起来,从而实现将 Checkpoint 中的状态 Assign 给 DAG 中的算子。 Checkpoint 优化
Flink 社区提供了两种触发检查点的方式:Checkpoint 和 Savepoint。关于 Checkpoint 和 Savepoint 的区别,可以参考社区相关的文档。简单来说(对我们使用 RockDB State Backend,且开启了 Incremental Checkpoint),Checkpoint 和 Savepoint 的区别主要包括如下两个方面:- Checkpoint 为增量式的,且可以在单个 Task 内部利用多线程上传文件,而 Savepoint 把 RocksDB 的状态作为一个 Stream,单线程上传至 HDFS
- 由于我们开启了 Retain Checkpoint On Cancellation,Checkpoint 会依赖之前的运行实例生成的 Checkpoint,且可能存在较长的依赖链,导致之前的运行实例产生的 Checkpoint 必须保留在 HDFS 上,从而占用大量空间
基于上述 Checkpoint 和 Savepoint 的优缺点,我们提出了 Full Checkpoint 的概念。Full Checkpoint 综合了 Incremental Checkpoint 和 Savepoint 的优点:- Full Checkpoint 只上传增量的文件,且跟 Checkpoint 一样可以利用多线程减少 Checkpoint 完成需要的时间。
- 对依赖的之前的 Checkpoint 的文件,Full Checkpoint 会上传,从而使得 Full Checkpoint 生成的 Checkpoint 对之前运行实例的 Checkpoint 不存在依赖关系,便于 Checkpoint 的清理、迁移等。
根据上述思想,Full Checkpoint 涉及的主要组件和各组件的作用可以使用下图描述。Full Checkpoint 可以由用户或者平台通过 Flink Rest API 触发,我们也在接下来要介绍的重启接口中使用了 Full Checkpoint,并且计划使用 Full Checkpoint 替换 Savepoint 作为停止作业前的默认检查点实现方式。2、Regional Checkpoint在 Flink 作业中,有一类作业有着比较明显的特点:作业的 DAG 中没有或者几乎没有(关于这一点在下面的 Regional Checkpoint 的扩展中会详细解释)ALL-TO-ALL 的连接、对数据的准确性要求较高(这类作业一般都有很多下游作业)。在这类作业中,Checkpoint 扮演着很重要的角色,在数据重放时,不仅影响作业的 Exactly-Once 特性,还决定着要从 Kafka 拉取多少数据。对于并行度比较大的作业,受环境的影响(网络抖动、存储抖动等),会导致 Checkpoint 有比较大的概率失败。为了解决这类问题,我们根据这类作业的特点(几乎没有 ALL-TO-ALL 连接),借鉴了社区 Region-Failover 的思路,并参考了业界的实现,实现了称为 Regional-Checkpoint 的 Checkpoint 优化。所谓 Regional-Checkpoint,即将 DAG 划分为一个个的 Region,将 Region 作为相对独立的单元看待,Region 之间不产生相互影响。下图展示了几种典型场景下的 Region 划分:将 DAG 划分为 Region 之后,按照下表所示的逻辑对 Checkpoint 进行处理:
在实际的开发中,为了方便引入 Regional Checkpoint 相关的处理逻辑及减小对原生 Checkpoint 处理逻辑(上表中的 Global Checkpoint)的影响,我们抽象出了 CheckpointHandler 接口,将相关的公共逻辑放入了其抽象实现类 AbstractCheckpointHandler 中,并将 Global Checkpoint和Regional Checkpoint 的处理逻辑分别放在 GlobalCheckpointHandler 和 RegionalCheckpointHandler 中。用户可以通过参数来控制是否使用 Regional Checkpoint。 相关工具的开发
下面介绍一些我们为了方便 Checkpoint 相关运维和问题排查而开发的实用工具。1、定期状态文件清理上面介绍 Full Checkpoint 时提到过,我们开启了 Incremental Checkpoint 和 Checkpoint 的 Retain On Cancellation 功能。这会造成 Checkpoint 之间存在依赖关系,对 Checkpoint 的清理造成一定的影响,从而造成 Checkpoint 占用的空间增加,HDFS 存储成本增加。为此,我们引入定期执行的 Checkpoint 清理程序。其基本逻辑为遍历 Checkpoint 的存储目录,过滤掉设定的时间之前的 Checkpoint,找到剩下的每个 Checkpoint 的_metadata 文件,并从_metadata 文件中解析出对状态文件的引用,记录下来,之后将所有不再被引用的状态文件做清理。下图描述了清理流程。在 B站,我们增加了在作业启动时将 Operator ID 和 Operator Name 的映射信息打印到日志的功能来帮助排查 Flink 作业无法从 Checkpoint 恢复的问题,在线上问题排查中起到了很好的作用。但是由于 B站的 ES 日志索引保存时间为 14 天,我们也遇到了运行时间超过 14 天的作业在重新启动后无法从 Checkpoint 恢复时,无法找到 Operator ID 和 Operator Name 的映射关系,从而导致无法排查出原因的情况。
为了解决这个问题,我们将 Operator ID 和 Operator Name 的映射关系存入了 Checkpoint 的元数据中,通过增加了极少量的元数据存储,使得我们在任何时候都能获得 Operator ID 和 Operator Name 的映射关系。下图展示了加入额外的元数据后,_metadata 文件的存储结构。
在排查 Checkpoint 相关的问题时,查看 Checkpoint 的元数据(_metadata 文件的内容)是一个必要且十分有用的手段。为了方便地查看 Checkpoint 的元数据信息,我们开发了相关的工具来查看给定 Checkpoint(指向_metadata 文件)的元数据。该工具除了支持社区原生的元数据外,还支持上面提到的 B站自定义的元数据。用户可以提供参数来只是是否需要打印出所有的 Operator ID 和 Operator Name 的映射信息,也可以提供需要打印的 Operator 的 ID 或者 Name。可用性提升在 B站,有一类实时计算作业作为基础组件为其它所有的实时或离线计算作业提供基础,因此这类作业对可用性有很高的要求。最初这类作业使用 Flume 实现。随着公司实时计算的技术栈全面往 Flink 迁移,这类作业也有使用 Flink 的需求。为了满足这类计算作业的可用性需求,我们做了很多的工作,下面介绍其中主要的几点工作。 Hybrid HA
对于生产环境,社区版 Flink 提供了基于 Zookeeper(用于 YARN 和 Standalone)和 config map(用于 Kubernetes)的 HA 方案。B站的 Flink 作业部署和运行在 Yarn 环境中,基于 Zookeeper 的高可用方案是唯一可选的方案。但是根据 B站过往使用基于 Zookeeper 的 HA 的经验,在作业数量变大之后,Zookeeper 本身的不稳定性反而会造成作业的失败。为了提高作业的可用性,我们需要对社区基于 Zookeeper 的 HA 方案进行一定的改造。考虑到在 YARN 部署环境下,实际上并没有实际进行 Leader 选举,而 Leader 监听机制可以通过轮询来实现。基于这个现实,我们考虑实现基于 HDFS 的 HA 机制,同时考虑到如果全量作业都基于轮询机制请求 HDFS,会对 HDFS 的 namenode(或者 NNProxy server)造成较大的压力,我们提出了同时基于 Zookeeper 和 HDFS 的 Hybrid HA 机制。该机制的主要思想为在 Zookeeper 运行正常时,HA 机制基于 Zookeeper 运行,在 Zookeeper 发生异常时,为了保障作业仍能稳定运行,HA 机制切换到基于 HDFS 运行。由于数据可能会存在两个系统里,如何保障数据的一致性成为必须要考虑的问题。为此,我们对 HA 涉及到的数据进行了梳理,HA 要写入的数据包括:- ResourceManager、Dispatcher、JobMaster 的 address:JobManager 和 TaskManager 都会读取
- Checkpoint、CheckpointIDCounter:仅 JobManager 读取
对于仅被 JobManager 读取的数据,由于一个作业仅有一个,读取频率不高,仅写入 HDFS 即可。对于会被 TaskManager 读取的数据,读取频率很高,会对 HDFS 造成比较大的压力,需要同时写入 HDFS 和 ZK,ZK 正常时从 ZK 读取,只在 ZK 异常时从 HDFS 读取。综上,写入数据时要保证:- TaskManager 用到的数据要同时写入 ZK
将上述过程用图形表示如下:当 JM 地址成功写入 HDFS,但是写入 ZK 失败时,就会出现一致性问题。此时,从 HDFS 读取到的是最新的数据,而从 Zookeeper 读取到的是过期的数据。为了保证 leader 发现时的可用性和一致性,读数据要保证:解决不一致问题的常用手段就是加版本号信息,我们给写入 HDFS 和 ZK 的数据都加上版本号,只要两者的版本号一致就可以认为数据一致。Leader 选举和 Leader 发现的信息存储在 LeaderInformation 类中,每一次新的 Leader 选举都会生成一个新的 Leader Session ID,它是一个 UUID 随机字符串,我们选择把它作为数据的版本号。 Job Manager 恢复过程保持 Task 正常运行 (Reconcilation)
社区版本的 Flink 在 Job Manager 失败恢复(HA)的过程如下图所示。可以看到,Task Executor 通过高可用服务感知到 Leader 发生切换或 Task Executor 与 Job Master 之间的心跳超时的时候,会主动断开与 Job Master 的连接,在这个过程中会进行如下动作:- 将运行在该 Task Executor 上所有属于该 JobMaster 的 Tasks 取消(cancel)
- 将 Job 持有的 slots 状态从 ACTIVE 改为 ALLOCATED(也就是已分配但尚未给到 Job 使用)
根据我们对线上任务的统计,作业启动过程中,根据作业并行度的大小和作业运行图的复杂度,部署所有的 Task 可能花费从一百毫秒级到几十秒钟的时间,所以即使开启了 Job Manager HA,不考虑 Task 失败的情况下,作业仍然可能几十秒钟的不可用时间。而实际上,在 Task 正常运行过程中,除了 TaskExecutor 与 JobMaster 的心跳,Task 需要与 JobMaster 交互的信息非常之少(汇报 Checkpoint 的信息和 Accumulator 的信息,而多数任务是没有使用 Accumulator 的)。基于以上统计和分析,我们考虑在 Job Manager HA 的过程中保持 Task 正常运行。为了达到上述目标,需要同时对 JobMaster 端和 TaskExecutor 端进行改造。经过分析相关代码,需要进行如下改造。- 取消 TaskManager 由于 JobManager 的 Leadership 变动或者心跳超时就 Cancel Task 的行为TaskManager 与新的 JobManager Leader 建立连接后通过心跳上报 Job 对应的 Slot 信息和 Task 的运行状态,以便 JobManager 恢复 ExecutionGraph 和 SlotPool
- 对 ExecutionGraph 核心数据进行快照,实现一个 FileSystem Store 远端存储快照,基于该快照初始化一个用于恢复的 ExecutionGraph 对象
- ExecutionGraph 内部实现恢复 Job 的逻辑(称为 RECONCILING 状态),主要是 Job 和 Task 相关状态的恢复
- 实现 Job Master Failover 之后基于快照和 Task Executor 上报的信息来恢复 Job 的过程(RECONCILING 过程)
上述过程可以使用图形表示如下:除上述主流程的改造之外,我们还需要确保在 TaskExecutor 与老的 JobMaster 断开连接到与新的 JobMaster 重新建立连接之间 Task Executor 到 JobMaster 的 RPC 能够得到正确的处理。 Regional Checkpoint 适配 HDFS Sink
上面介绍了我们引入 Regional Checkpoint 的背景,这个背景跟写入 HDFS 的作业也很符合,但是 HDFS Sink 作业却无法应用 Regional Checkpoint,原因在于 HDFS 中存在一个单并行度的算子:StreamingFileCommitter。正式由于该算子的存在,导致作业中会存在 ALL-TO-ALL 连接,从而导致整个作业会被划分为一个 Region,进而导致 Regional Checkpoint 没有效果。考虑到写 HDFS 的作业的数量及其重要性,我们将 Regional Checkpoint 对 HDFS Sink 进行了适配。HDFS Sink 场景中的一个典型的 DAG 如下图所示(图中还展示了 Region 划分的方式):为了在该场景下应用 Regional Checkpoint,我们修改了该场景下的 Region 划分方式,将 Source 到 FileWriter 的链路划分为一个 Region,并将 FileCommitter 划入每一个 Region 中。考虑到实现的通用性,我们为 FileCommitter 这种可以通过被不同 Region 重复包含而实现在含有单并行度算子中将 DAG 划分为多个 Region 的算子引入了一个新的接口 RegionSharable,在开启了 Regional Checkpoint 的配置且系统识别到 DAG 中包含实现了 RegionSharable 算子的情况下,系统可以自动实现将该算子划分到之前的每一个 Region,并在适当的时机调用该算子通过 RegionSharable 提供的方法来通知该算子必要的信息。RegionSharable 接口及相关数据的结构的定义如下:
public interface RegionShareOperator {
void notifyRegionalCkComplete(RegionCheckpointFailedDetail detail);
}
public class RegionCheckpointFailedDetail {
long checkpointId;
FailedScope scope;
Set failedTasks;
enum FailedScope {
UP_STREAM,
DOWN_STREAM,
BOTH
}
enum FailedType {
ERROR,
EXPIRED
}
static class FailedTaskInfo {
int subTaskId;
String taskName;
FailedType reason;
String message;
}
}
运行时,CheckpointCoordinator在Source到FileWriter链路上发生Checkpoint失败时,会找到FileCommitter所在Execution,从而找到FileCommitter所在的Task Manager,并通过Task Manager Gateway通知Checkpoint Region的失败情况,FileCommitter会依据自身业务逻辑进行相应的处理,来保障数据处理的Exactly-Once语义。 为 SQL 作业应用 Rescale Partitioner
社区版本的 Flink 中提供了 Region Failover 的功能,在 Region 中的 Task 失败后,可以只重启对应的 Region,而不用重启整个作业或者作业中的所有 Tasks。在 Kafka 的 Partition 数量和作业默认并行度一致的情况下,作业使用 Forward 模式,DAG 划分为和 Kakfa Partition 数量一致的 Region,可以充分利用 Region Failover 减少需要重启的 Task 数量,从而提高作业的可用性。在 B站内部,为了防止用户手动设置 Source 算子的并行度导致的问题(算子空跑、性能问题等),我们禁用了用户设置 Source 算子并行度的功能。最初的设计中,我们提供两种连接方式。在 Source 算子消费的 Kafka topic 的 partition 数量和作业默认并行度一致时,使用 Forward 连接方式;在 Source 算子消费的 Kafka topic 的 Partition 数量和作业默认并行度不一致时,使用 Rebalance 连接方式。实际生产环境中经常会出现 Kafka topic 的 Partition 数量和作业默认并行度不一致的场景,会在 Source(Source 算子+Calc+可选的 Watermark Assigner)和后续算子间使用 Rebalance 模式,进而整个作业的 DAG 被划分到一个 Region 中,导致任意一个 Task 失败都会导致所有的 Tasks 需要重启。为了解决该问题,我们在 SQL 作业中也引入了 Rescale 模式。Rescale 模式的典型连接方式如下图所示:其中也标示了对应场景中的 Region 划分方式,可以看到在一个 Task 失败时,并不会重启所有的 Sub-task,对可用性提升会有较大的提升作用。注意到使用 Rescale 模式的前提条件是 Kafka 的 Partition 数量和全局并行度成倍数关系,这导致虽然上述改动可以覆盖更多的场景,但是仍无法满足所有的需要。为了覆盖更多的传输作业场景,我们引入了 force-rescale 参数,该参数会在 Source 并行度和全局并行度不是倍数关系时,强制使用 Rescale 模式。通过使用该参数,用户可以根据业务需求在性能和可用性之间做取舍。 单点恢复(Approximate Local Recovery)
在实时计算的应用场景中,有一类作业有如下特点:- 计算中主要的计算为双流或多流join逻辑,算子间会产生 ALL-TO-ALL 连接
- 由于作业并行度大,作业在失败恢复时,需要花费的时间较长
在 B站,满足上述特点的作业包括商业化部门和 AI 部门的模型训练作业和样本拼接作业。由于作业的算子间存在 ALL-TO-ALL 连接,即使开启了 region failover,在任意 task 失败时也会发生全量 failover,造成数据较长时间的不可用。基于业务对数据一致性要求不高的特点,参考业界的一些分享,我们开发了单点恢复的功能。一个典型的 Flink task(不包括 source task(没有上游 task)和 sink task(没有下游 task))会包含上游 task 和下游 task,一个 task 会和它的上下游 task 产生数据交换。这种数据交换既可能发生在相同的 task manager 内部,也可能发生在不同的 task manager 之间。下图显示了 Flink 中的网络模型,其中上侧的图为跨 Task Manager 进行数据交换的图示,下侧的图则简化展示了在同一个 Task Manager 内部的数据交换的图示。为了实现单点恢复,需要进行以下处理:- 在 task 失败后,只重启失败的 task,并在多个 task 发生重启时,保证 task 按照期望的顺序启动
- 上下游 task 感知到 task 的失败,并在 task 重启前进行正确的处理
- Task 重启后,上下游 task 能正确与重启后的 task 实现数据交互
其中还有一个细节值得注意,为了吞吐量考虑,Flink 会将多个序列化后的 Record 放入同一个 Buffer 中进行发送,也导致了一个序列化后的 Record 可能会存在于两个或者多个 Buffer 中。在 Task 失败和重启后,需要对这部分 Buffer 进行细致的处理,否则可能导致 Record 反序列化失败,从而导致 Task 重启后失败。下面我们分别介绍对应的工作。1、Task 的重启Flink 对 task 失败后应该要进行哪些 task 的重启进行了良好的抽象,我们只需要实现 Flink 提供的接口 FailoverStrategy,在其方法 getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause)的实现中只返回传入的 task,并实现对应的 Factory 即可。由于可能同时有多个 task 失败(例如由于 task manager 被 kill),而 Flink 并不保证调用重启 task 的顺序,这样会可能会导致下游 task 先于上游 task 被启动,从而造成下游 task 视图找到消费的 partition 时发生失败,导致 task 启动失败。Flink 作业在启动时也可能发生类似的情况,其处理方法为在 Task 获取消费的 partition 时,进行重试,并从 Job Master 拉取 partition 最新信息。为了降低实现的复杂度,我们选择在 Job Master 端启动 task 时进行一些控制,从而保证下游的 task 在启动时,上游的 task 已经启动。具体的处理方法为,下游 task 在启动时,如果发现其上游的 task 还未启动,则先从启动过程退出,并注册对上游 task 启动状态的监听,在其监听的所有上游 task 都启动成功之后,再恢复启动过程。2、Task 失败的感知及对应的处理
Task 可能会因为以下的原因失败:- 单个 Task 由于本身的运行逻辑或遇到脏数据未能正确进行容错等原因失败
- TaskManager 被 kill,导致运行在该 task manager 上的所有 task 失败
- 机器断电宕机,导致该机器上的所有 task manager 上的所有 task 失败
针对上述类型的任务失败,其上下游 task 感知到其失败的方式分别对应如下:- Task 自身的清理逻辑会在 task 失败后清理对应的网络资源,会向上游 task 发送 channel close 信息,并向下游 task 发送 exception
- Task manager 被 kill 后,其持有的 TCP 连接会进行关闭,上游的 Netty Server 和下游 Netty Client 会感知到,并通过回调函数通知对应的 task
- 机器宕机的情况下,上下游 task 无法通过 Netty 感知到对应网络连接的失败,我们通过 Job manager 来实现对上下游 taskd 的通知 h4. 不完整记录的处理
在上述背景中介绍了不完整记录产生的背景和可能造成的问题。为了解决上述问题,我们需要在上游 task 感知到下游 task 失败和下游 task 重新连接后进行必要的处理。同时,需要下游 task 感知到上游 task 失败后,也需要进行必要的处理。对以不完整记录开头的 Buffer 的处理参考了社区的方案,将 BufferConsumer 和不完整记录的长度封装到 BufferConsumerWithPartialLength 中。在上游 task 感知到下游 task 失败重连之后,会将是否需要清楚 partitial record 的标志位设置为 true,在将 BufferConsumer 的数据拷贝进入网络栈前,依据该标志位进行不同的动作,如果该标志位为 true,会将不完整记录的长度跳过,如果跳过的长度为 BufferConsumer 的长度,则说明该记录可能跨越多个 BufferConsumer,仍然保持该标志位为 true,以便在处理下一个 BufferConsumer 时仍然进行相同的判断;如果跳过的长度小于 BufferConsumer 的长度,则会将标志位置为 false。下游 task 通过网络栈接收到数据之后,在 StreamTaskNetworkInput 中,会将数据缓存在 RecordDeserializer 中,拿到完整记录的数据(可能跨越多个 buffer),将记录反序列化之后,交给 task 线程去进行后续的处理。因此在上游 task 失败,下游 task 感知到之后,可以通过将 RecordDeserializer 中还未反序列化的 buffer 中的数据清除来实现对 buffer 结尾处的不完整记录的处理。这里有一个问题需要注意,StreamTaskNetworkInput 位于 task 层,和感知上游 task 失败的网络层处于不同的处理线程中。为了让 StreamTaskNetworkInput 感知到上游 task 的失败,我们选择了按照当前 Flink 的线程模型来处理,即网络层感知到上游 task 失败之后,通过向 task 传递一个 ChannelUnavailableEvent 事件。这样可以不用在 task 层和网络层之间添加复杂的同步操作,否则可能会影响 Flink 数据处理的性能。h3. 下游 task 恢复之前,对应上游 task 丢弃相应的数据Flink 依赖下游的 Netty client 及时读取上游 task 产生的数据来维持上游 task 的 buffer 占用维持在较低的水平(不能及时拉取时会出现反压)。下游 task 失败时,将停止从上游 task 拉取数据,如果不做任何处理,将会导致上游 task 的内存占用过高,最终导致全部上游 task 反压。为了解决上述问题,有两种思路:- 上游 task 感知到下游 task 失败之后,直接将发送到相应 task 的数据丢弃
- 上游 task 将待发送的数据顺序写入磁盘文件,待下游 task 恢复之后,上游 task 先从磁盘读取数据发送给下游 task,待磁盘数据读取完毕之后,再恢复内存发送数据
- 第二种方案实现较复杂,我们目前采用了第一种实现方式。具体做法为在 PipelinedApproximateSubpartition 中设置一个标志位(PipelinedApproximateSubpartition 是 PipelinedSubpartition 的子类,其中添加了实现单点恢复逻辑需要的功能),在上游 task 感知到下游 task 失败之后,会将该标志位设置为 true。在将数据通过 PipelinedSubpartition#add(BufferConsumer bufferConsume, ...)添加到 subpartition 时,如果发现该标志位为 true,则直接调用 BufferConsumer#close() 将数据丢弃并返回。h2. Task 的恢复
从上面的介绍可以知道,job manager 会重启失败的 task,我们需要保证 task 重启后,与其有连接的上下游 task 都能正确地恢复数据的发送或接收。3、上游 Task 恢复的动作
在开启单点恢复后,数据发送端会切换为 PipelinedApproximateSubpartition 实现,其中维护了 available 字段,我们需要在下游 task 恢复之后,将其从 false 设置为 true,以恢复数据的发送。4、下游 Task 恢复的动作
- 失败的上游 task 重启后,job manager 会送 RPC 到 task manager,通知对应的下游 task。在下游 task 接收到通知后,需要重新建立与上游 task 的数据联系。主要的逻辑位于 SingleInputGate 中,收到通知后,SingleInputGate 会视图重新初始化 InputChannel 并替换之前维护的 InputChannel,并在初始化完毕后,通过 InputChannel 重新请求上游的 subpartition。这里有两个问题需要注意:
- 对和上游 task 同时失败的 task,其有两个途径进入 InputChannel 的初始化路径。一是我们刚刚介绍的 job manager 发送的通知,二是其启动时得到的信息。我们需要注意对这两种情况进行必要的处理。
- 在上游 task 失败前,下游 task 可能处于严重的堆积状态或者阻塞状态,导致其接收的数据迟迟无法处理,从而导致其接收到 job manager 的通知时,发现相应的 InputChannel 处于 available 状态。对这种情况,我们的处理是,先将信息缓存起来,等待消费到 ChannelUnavailableEvent 后,再进行 InputChannel 的重建工作。同时,我们设置一个定时器,若定时器超时时,仍然没有消费到 ChannelUnavailableEvent,我们将对应的 task 直接置为失败。
其他优化下面介绍一些 B站在 Runtime 上其它方面取得的一些进展。 基于 Backlog 负载均衡
Flink 的 Rescale Partitioner 和 Rebalance Partitioner 默认会使用 Round-robin 的方式,把数据发送到下游 Task 的 Channel 中,其中涉及到的组件如下图所示。在生产环境中经常遇到的问题是,由于环境波动,下游 Task 的 Sub-task 处理能力会出现不均衡,最终会导致导致整个上游 Task 发生反压。在 Rescale 和 Rebalance 模式下,每一条数据并没有特定的指向性,可以发送给下游的任意 Sub-task,因此如果可以根据下游 Sub-task 的负载动态分发数据的话,将可以改善上游 Task 的反压状况。基于上述思想,考虑到在 Flink Credit-based Flow Control 机制中,Backlog Size 用来反应下游 sub-task 的处理负载(Backlog Size 越大,说明下游消费能力越差),我们引入了基于 Backlog Size 的动态负载均衡机制来代替社区原生的 Round-robin 的方式。该方案的整体架构如下:LoadBasedChannelSelector 为我们新引入的类,其实现了 ChannelSelector,用来替换社区实现的 RoundRobinChannelSelector。其主要功能在一个可替换的抽象类 BacklogLoadBasedStrategy 中实现。BacklogLoadBasedStrategy 通过监听器监听 ResultPartition 中的 Backlog Size 变化,并根据 Backlog Size 的变化动态地改变维护的状态,用来决定如何为一条数据选择下游的 Channel。 大规模集群运维优化
B站的实时 YARN 集群中有一千多台机器,经常会有因为内存/磁盘故障或者更新操作系统等需求要下线机器的需求。由于 Flink 作业 7X24 运行的特性,如果直接下线机器的话,会对用户体验造成很坏的影响。为了使得这种运维操作更加平滑,我们设计了如下的流程:- 将待运维的机器列表从 YARN 摘掉 label(防止运维过程中有新的作业或者已有作业重新部署到这些机器上)
- 对其中的每一个作业,调用 Flink 提供的带黑名单的接口将该作业重启,并验证重启成功
在上述流程中,有一个重要的步骤是调用 Flink 提供的接口来重启作业,且该接口可以提供一个机器列表作为黑名单,在该列表中的机器上的资源将被 Flink 资源管理器忽略。跟我们添加的其它接口一样,重启接口需要通过 Flink 提供的 Rest API 调用。如果在调用重启接口时提供了机器的主机名列表作为参数,分布在该机器上的可用 slots 不会被分配给重启的作业,并且如果没有足够的资源来重启作业,Flink 在向 YARN 申请资源时也会将这些机器的主机名作为黑名单,防止 YARN 分配位于这些机器上的资源。下图为重启接口实现的整体架构及相关组件的交互过程(注意:图中并未画出与 YARN 的交互过程)。其他优化下面介绍一些 B站在 Runtime 上其它方面取得的一些进展。 增加更多 Checkpoint 恢复的兼容场景
我们已经做了很多工作来实现 Checkpoint 的兼容性,但是在实际场景中还有很多需要兼容的场景,例如聚合计算时,增加或者减少聚合指标的场景、异步维表 join 场景下结果表的字段增减的场景等,后续我们也会对这些场景提供支持。 HDFS Sink适配 Region Failover
上面介绍过 HDFS Sink 适配 Regional Checkpoint 的工作。基于这个思想,我们计划进行 HDFS Sink 对 Region Failover 的适配,提升 HDFS Sink 作业的可用性。 实现无重启的扩缩容
在实时计算中,因为业务增加或者突然的流量增加或者减少,对作业进行扩缩容是很常见且非常必要的操作。目前为了对一个作业进行扩缩容,我们需要先将作业下线,重新配置参数(并行度)后,再将作业进行提交。根据作业并行度和状态的大小,这会花费分钟级的时间。对一些作业来说,这是不可接受的,对于可以接受这个延迟的作业,也希望能将扩缩容的时间尽量降低。基于这个背景,我们会提供无需重启作业的扩缩容操作。参考文献:[1] https://www.infoq.cn/article/88iajgkazdxw5hut-joh
[2] https://www.infoq.cn/article/idw_igykly724yhkgqbk
[3] https://developer.aliyun.com/article/774837
[4] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/libs/state_processor_api/
你可能还想看关注我们欢迎关注加星标✨ 回复关键词可领取相关技术白皮书
随机抽取送技术图书 · 重大节日发放文创纪念品